This library provides an integration with the Delta Lake storage framework.
Related Guides:
Storage location where Delta tables are stored.
The write mode passed to save the output.
Default Value: ‘overwrite’
Default Value: False
Engine passed to write_deltalake.
Default Value: ‘pyarrow’
Storage configuration for Microsoft Azure Blob or ADLS Gen 2 object store.
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Storage configuration for Amazon Web Services (AWS) S3 object store.
{
"access_key_id": null,
"secret_access_key": null,
"region": null,
"bucket": null,
"endpoint": null,
"token": null,
"imdsv1_fallback": false,
"virtual_hosted_style_request": null,
"unsigned_payload": null,
"checksum": null,
"metadata_endpoint": null,
"container_credentials_relative_uri": null,
"copy_if_not_exists": null,
"allow_unsafe_rename": null
}
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: False
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Storage configuration for local object store.
{}
Storage configuration for Google Cloud Storage object store.
{
"service_account": null,
"service_account_key": null,
"bucket": null,
"application_credentials": null
}
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Additional configuration passed to http client.
Default Value: None
Additional config and metadata added to table on creation.
Default Value: None
Name of the schema to use.
Default Value: None
Custom metadata that is added to transaction commit.
Default Value: None
Writer properties passed to the rust engine writer.
Default Value: None
Base class for an IO manager definition that reads inputs from and writes outputs to Delta Lake.
Examples
from dagster_deltalake import DeltaLakeIOManager
from dagster_deltalake_pandas import DeltaLakePandasTypeHandler
class MyDeltaLakeIOManager(DeltaLakeIOManager):
@staticmethod
def type_handlers() -> Sequence[DbTypeHandler]:
return [DeltaLakePandasTypeHandler()]
@asset(
key_prefix=["my_schema"] # will be used as the schema (parent folder) in Delta Lake
)
def my_table() -> pd.DataFrame: # the name of the asset will be the table name
...
defs = Definitions(
assets=[my_table],
resources={"io_manager": MyDeltaLakeIOManager()}
)
If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O Manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”.
@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...
To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.
@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
# my_table will just contain the data from column "a"
...
Storage location where Delta tables are stored.
The write mode passed to save the output.
Default Value: ‘overwrite’
Default Value: False
Engine passed to write_deltalake.
Default Value: ‘pyarrow’
Storage configuration for Microsoft Azure Blob or ADLS Gen 2 object store.
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Storage configuration for Amazon Web Services (AWS) S3 object store.
{
"access_key_id": null,
"secret_access_key": null,
"region": null,
"bucket": null,
"endpoint": null,
"token": null,
"imdsv1_fallback": false,
"virtual_hosted_style_request": null,
"unsigned_payload": null,
"checksum": null,
"metadata_endpoint": null,
"container_credentials_relative_uri": null,
"copy_if_not_exists": null,
"allow_unsafe_rename": null
}
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: False
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Storage configuration for local object store.
{}
Storage configuration for Google Cloud Storage object store.
{
"service_account": null,
"service_account_key": null,
"bucket": null,
"application_credentials": null
}
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Additional configuration passed to http client.
Default Value: None
Additional config and metadata added to table on creation.
Default Value: None
Name of the schema to use.
Default Value: None
Custom metadata that is added to transaction commit.
Default Value: None
Writer properties passed to the rust engine writer.
Default Value: None
Base class for an IO manager definition that reads inputs from and writes outputs to Delta Lake.
Examples
from dagster_deltalake import DeltaLakeIOManager
from dagster_deltalake_pandas import DeltaLakePandasTypeHandler
class MyDeltaLakeIOManager(DeltaLakeIOManager):
@staticmethod
def type_handlers() -> Sequence[DbTypeHandler]:
return [DeltaLakePandasTypeHandler()]
@asset(
key_prefix=["my_schema"] # will be used as the schema (parent folder) in Delta Lake
)
def my_table() -> pd.DataFrame: # the name of the asset will be the table name
...
defs = Definitions(
assets=[my_table],
resources={"io_manager": MyDeltaLakeIOManager()}
)
If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O Manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a “schema” entry in output metadata. If none of these is provided, the schema will default to “public”.
@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...
To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.
@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
# my_table will just contain the data from column "a"
...
Storage configuration for Microsoft Azure Blob or ADLS Gen 2 object store.
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Storage configuration for Amazon Web Services (AWS) S3 object store.
{
"access_key_id": null,
"secret_access_key": null,
"region": null,
"bucket": null,
"endpoint": null,
"token": null,
"imdsv1_fallback": false,
"virtual_hosted_style_request": null,
"unsigned_payload": null,
"checksum": null,
"metadata_endpoint": null,
"container_credentials_relative_uri": null,
"copy_if_not_exists": null,
"allow_unsafe_rename": null
}
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: False
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Storage configuration for local object store.
{}
Storage configuration for Google Cloud Storage object store.
{
"service_account": null,
"service_account_key": null,
"bucket": null,
"application_credentials": null
}
Default Value: None
Default Value: None
Default Value: None
Default Value: None
Additional configuration passed to http client.
Default Value: None
Version to load delta table.
Default Value: None
Resource for interacting with a Delta table.
Examples
from dagster import Definitions, asset
from dagster_deltalake import DeltaTableResource, LocalConfig
@asset
def my_table(delta_table: DeltaTableResource):
df = delta_table.load().to_pandas()
defs = Definitions(
assets=[my_table],
resources={
"delta_table": DeltaTableResource(
url="/path/to/table",
storage_options=LocalConfig()
)
}
)